# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations

import copy
from pathlib import Path
from typing import TypedDict

import localstack.services.cloudformation.provider_utils as util
from localstack.services.cloudformation.resource_provider import (
    OperationStatus,
    ProgressEvent,
    ResourceProvider,
    ResourceRequest,
)


class LambdaEventSourceMappingProperties(TypedDict):
    FunctionName: str | None
    AmazonManagedKafkaEventSourceConfig: AmazonManagedKafkaEventSourceConfig | None
    BatchSize: int | None
    BisectBatchOnFunctionError: bool | None
    DestinationConfig: DestinationConfig | None
    DocumentDBEventSourceConfig: DocumentDBEventSourceConfig | None
    Enabled: bool | None
    EventSourceArn: str | None
    FilterCriteria: FilterCriteria | None
    FunctionResponseTypes: list[str] | None
    Id: str | None
    MaximumBatchingWindowInSeconds: int | None
    MaximumRecordAgeInSeconds: int | None
    MaximumRetryAttempts: int | None
    ParallelizationFactor: int | None
    Queues: list[str] | None
    ScalingConfig: ScalingConfig | None
    SelfManagedEventSource: SelfManagedEventSource | None
    SelfManagedKafkaEventSourceConfig: SelfManagedKafkaEventSourceConfig | None
    SourceAccessConfigurations: list[SourceAccessConfiguration] | None
    StartingPosition: str | None
    StartingPositionTimestamp: float | None
    Topics: list[str] | None
    TumblingWindowInSeconds: int | None


class OnFailure(TypedDict):
    Destination: str | None


class DestinationConfig(TypedDict):
    OnFailure: OnFailure | None


class Filter(TypedDict):
    Pattern: str | None


class FilterCriteria(TypedDict):
    Filters: list[Filter] | None


class SourceAccessConfiguration(TypedDict):
    Type: str | None
    URI: str | None


class Endpoints(TypedDict):
    KafkaBootstrapServers: list[str] | None


class SelfManagedEventSource(TypedDict):
    Endpoints: Endpoints | None


class AmazonManagedKafkaEventSourceConfig(TypedDict):
    ConsumerGroupId: str | None


class SelfManagedKafkaEventSourceConfig(TypedDict):
    ConsumerGroupId: str | None


class ScalingConfig(TypedDict):
    MaximumConcurrency: int | None


class DocumentDBEventSourceConfig(TypedDict):
    CollectionName: str | None
    DatabaseName: str | None
    FullDocument: str | None


REPEATED_INVOCATION = "repeated_invocation"


class LambdaEventSourceMappingProvider(ResourceProvider[LambdaEventSourceMappingProperties]):
    TYPE = "AWS::Lambda::EventSourceMapping"  # Autogenerated. Don't change
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change

    def create(
        self,
        request: ResourceRequest[LambdaEventSourceMappingProperties],
    ) -> ProgressEvent[LambdaEventSourceMappingProperties]:
        """
        Create a new resource.

        Primary identifier fields:
          - /properties/Id

        Required properties:
          - FunctionName

        Create-only properties:
          - /properties/EventSourceArn
          - /properties/StartingPosition
          - /properties/StartingPositionTimestamp
          - /properties/SelfManagedEventSource
          - /properties/AmazonManagedKafkaEventSourceConfig
          - /properties/SelfManagedKafkaEventSourceConfig

        Read-only properties:
          - /properties/Id

        IAM permissions required:
          - lambda:CreateEventSourceMapping
          - lambda:GetEventSourceMapping

        """
        model = request.desired_state
        lambda_client = request.aws_client_factory.lambda_

        params = copy.deepcopy(model)
        if tags := params.get("Tags"):
            transformed_tags = {}
            for tag_definition in tags:
                transformed_tags[tag_definition["Key"]] = tag_definition["Value"]
            params["Tags"] = transformed_tags

        response = lambda_client.create_event_source_mapping(**params)
        model["Id"] = response["UUID"]
        model["EventSourceMappingArn"] = response["EventSourceMappingArn"]

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
            custom_context=request.custom_context,
        )

    def read(
        self,
        request: ResourceRequest[LambdaEventSourceMappingProperties],
    ) -> ProgressEvent[LambdaEventSourceMappingProperties]:
        """
        Fetch resource information

        IAM permissions required:
          - lambda:GetEventSourceMapping
        """
        raise NotImplementedError

    def delete(
        self,
        request: ResourceRequest[LambdaEventSourceMappingProperties],
    ) -> ProgressEvent[LambdaEventSourceMappingProperties]:
        """
        Delete a resource

        IAM permissions required:
          - lambda:DeleteEventSourceMapping
          - lambda:GetEventSourceMapping
        """
        model = request.desired_state
        lambda_client = request.aws_client_factory.lambda_

        lambda_client.delete_event_source_mapping(UUID=model["Id"])

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
            custom_context=request.custom_context,
        )

    def update(
        self,
        request: ResourceRequest[LambdaEventSourceMappingProperties],
    ) -> ProgressEvent[LambdaEventSourceMappingProperties]:
        """
        Update a resource

        IAM permissions required:
          - lambda:UpdateEventSourceMapping
          - lambda:GetEventSourceMapping
        """
        current_state = request.previous_state
        model = request.desired_state
        lambda_client = request.aws_client_factory.lambda_

        params = util.select_attributes(
            model=model,
            params=[
                "FunctionName",
                "Enabled",
                "BatchSize",
                "FilterCriteria",
                "MaximumBatchingWindowInSeconds",
                "DestinationConfig",
                "MaximumRecordAgeInSeconds",
                "BisectBatchOnFunctionError",
                "MaximumRetryAttempts",
                "ParallelizationFactor",
                "SourceAccessConfigurations",
                "TumblingWindowInSeconds",
                "FunctionResponseTypes",
                "ScalingConfig",
                "DocumentDBEventSourceConfig",
            ],
        )
        lambda_client.update_event_source_mapping(UUID=current_state["Id"], **params)

        model["Id"] = current_state["Id"]

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
            custom_context=request.custom_context,
        )
